package com.fwmagic.dynamic_rule.engine;

import com.fwmagic.dynamic_rule.bean.LogBean;
import com.fwmagic.dynamic_rule.bean.ResultBean;
import com.fwmagic.dynamic_rule.functions.*;
import com.fwmagic.dynamic_rule.utils.RuleStateDescUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 动态规则引擎版本5
 * 近期数据查State,远期数据查ClickHouse,加入Cache(redis),跨时间点的条件分段查询
 * <p>
 * 加入Drools规则引擎，通过Canal订阅mysql的binlog,将规则模版发送到Kafka中，Drools
 * 来判断模版和事件数据是否匹配
 */
public class RuleEngineV5 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

//        env.enableCheckpointing(1000);

        //添加kafka数据源
        DataStreamSource<String> eventStream = env.addSource(SourceFunctions.createKafkaSource("yinew_applog"));

        //json数据转成LogBean
        SingleOutputStreamOperator<LogBean> mapedStream = eventStream.map(new Json2BeanMapFunction());

        //把deviceId作为Key进行分组
        //TODO，后续可以改造成 动态keyBy
        KeyedStream<LogBean, String> keyedStream = mapedStream.keyBy(new DeviceIdKeySelector());

        //读取规则的Topic
        DataStreamSource<String> rulesStream = env.addSource(SourceFunctions.createKafkaSource("yinew_rules"));

        //广播规则，以便每个subTask都能读取到
        BroadcastStream<String> broadcastStream = rulesStream.broadcast(RuleStateDescUtils.ruleKieStateDesc);

        //连接事件流和规则流
        BroadcastConnectedStream<LogBean, String> connectedStream = keyedStream.connect(broadcastStream);

        //规则引擎的核心匹配逻辑
        SingleOutputStreamOperator<ResultBean> resultStream = connectedStream.process(new RuleProcessFunctionV5());

        //匹配到的结果打印
        resultStream.print();

        //启动Flink任务
        env.execute("RuleEngineV5");
    }
}
